{
  "metadata" : {
    "name" : "Spark-Timeseries",
    "user_save_timestamp" : "1969-12-31T16:00:00.000Z",
    "auto_save_timestamp" : "1969-12-31T16:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : [ "cloudera % default % https://repository.cloudera.com/artifactory/cloudera-repos % maven" ],
    "customDeps" : [ "com.cloudera.sparkts % sparkts % 0.3.0" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "D059010F6F804B6C959AC03021373AB5"
    },
    "cell_type" : "markdown",
    "source" : "#Spark and Timeseries"
  }, {
    "metadata" : {
      "id" : "43FD915539E34024B7A65DD648021D46"
    },
    "cell_type" : "markdown",
    "source" : "This Notebook implements one of the examples developed by [Sandy Ryza](https://github.com/sryza) which is very interesting for analyzing Time-Series Data with Apache Spark.\n\n[Project _spark-timeseries_ on GitHub](https://github.com/sryza/spark-timeseries)\n\n[Cloudera Blog Post](http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark)\n\nConcretely, it brings an example exhibiting the use of TimeSeriesRDD for loading, cleaning, and filtering stock ticker data."
  }, {
    "metadata" : {
      "id" : "AA0FE52A21824986B51F5E18D75D39EF"
    },
    "cell_type" : "markdown",
    "source" : "<span class=\"alert alert-danger\">The following import requires **JAVA 8**!</span>"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "15663CCDE1F24E41B824317C503F042C"
    },
    "cell_type" : "code",
    "source" : "import java.time.{LocalDateTime, ZoneId, ZonedDateTime}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "55701A9AAD314C33AA228722FD7AB4A8"
    },
    "cell_type" : "code",
    "source" : "import java.sql.Timestamp\n\nimport com.cloudera.sparkts._\nimport com.cloudera.sparkts.stats.TimeSeriesStatisticalTests\n\nimport org.apache.spark.{SparkContext, SparkConf}\nimport org.apache.spark.sql.{DataFrame, Row, SQLContext}\nimport org.apache.spark.sql.types._",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "C435533A02724E7E95B4EFC73B10FD84"
    },
    "cell_type" : "markdown",
    "source" : "* Let's create the SQL Context"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F1576972F95846F592B555040E501B4A"
    },
    "cell_type" : "code",
    "source" : "val sqlContext = new SQLContext(sc)\nimport sqlContext.implicits._\nimport org.apache.spark.sql.functions._",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "70F9453DF1244F9195C71DE3F919B6B0"
    },
    "cell_type" : "markdown",
    "source" : "* Creates a Spark DataFrame of (timestamp, symbol, price) from a tab-separated file of stock ticker data."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "ADA437E64AA641FB831EC4A89CE1FE8B"
    },
    "cell_type" : "code",
    "source" : "def loadObservations(path: String): DataFrame = {\n    val rowRdd = sqlContext.sparkContext.textFile(path).map { line =>\n      val tokens = line.split('\\t')\n      val dt = ZonedDateTime.of(tokens(0).toInt, tokens(1).toInt, tokens(2).toInt, 0, 0, 0, 0,\n        ZoneId.systemDefault())\n      val symbol = tokens(3)\n      val price = tokens(5).toDouble\n      Row(Timestamp.from(dt.toInstant), symbol, price)\n    }\n    val fields = Seq(\n      StructField(\"timestamp\", TimestampType, true),\n      StructField(\"symbol\", StringType, true),\n      StructField(\"price\", DoubleType, true)\n    )\n    val schema = StructType(fields)\n    sqlContext.createDataFrame(rowRdd, schema)\n  }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "0AA0223838264388862E86613367EC65"
    },
    "cell_type" : "code",
    "source" : "val tickerObs = loadObservations(\"./conf/data/ticker.tsv\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "585C206A8AC34DE8847BE58FD27DB763"
    },
    "cell_type" : "markdown",
    "source" : "* Create an daily DateTimeIndex over August and September 2015"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "77C54209464C4681BF66FDDE65EB61D0"
    },
    "cell_type" : "code",
    "source" : "val zone = ZoneId.systemDefault()\nval dtIndex = DateTimeIndex.uniformFromInterval(\n      ZonedDateTime.of(LocalDateTime.parse(\"2015-08-03T00:00:00\"), zone),\n      ZonedDateTime.of(LocalDateTime.parse(\"2015-09-22T00:00:00\"), zone),\n      new BusinessDayFrequency(1))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "79BABF5B76124AE89F41044A6552E1F7"
    },
    "cell_type" : "markdown",
    "source" : "* Align the ticker data on the DateTimeIndex to create a TimeSeriesRDD"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "C8CC3A1264FC4FDC808677F561475B47"
    },
    "cell_type" : "code",
    "source" : "val tickerTsrdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, tickerObs,\n      \"timestamp\", \"symbol\", \"price\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "B6DAC1C1B61E44399092E25D5F2A45FF"
    },
    "cell_type" : "markdown",
    "source" : "* Cache the ticketTSRDD in memory"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "797A3CC5FE134709AC1FA8B2C74853B2"
    },
    "cell_type" : "code",
    "source" : "tickerTsrdd.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "E08BE4CA127640C99B21AF6D71F3D25E"
    },
    "cell_type" : "markdown",
    "source" : "* Count the number of series (number of symbols)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "FF8C1AEF083F4E198765B98955F3959E"
    },
    "cell_type" : "code",
    "source" : "tickerTsrdd.count()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "1F08A5B6A50C4820A9AE19CFF58474FD"
    },
    "cell_type" : "markdown",
    "source" : "* Impute missing values using linear interpolation"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "40F3D5372A5F45AD836C6B5E82B5FA5F"
    },
    "cell_type" : "code",
    "source" : "val filled = tickerTsrdd.fill(\"linear\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "274D28F5950641299B8DA99FBDF8FD68"
    },
    "cell_type" : "markdown",
    "source" : "* Compute return rates"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "87313631BE1F4EAD8CB13E239E2D899E"
    },
    "cell_type" : "code",
    "source" : "val returnRates = filled.returnRates()\n ",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "75CB271A848F46279C3DB8E35B791D23"
    },
    "cell_type" : "markdown",
    "source" : "* Compute Durbin-Watson stats for each series"
  }, {
    "metadata" : {
      "id" : "52BE0CD58906405E8BC21E9D11EEFFFB"
    },
    "cell_type" : "markdown",
    "source" : "_Interlude ($Math$):_\n\nThe Durbin-Watson stat $d$ helps detecting the autocorrelation (LSS, _ $\\exists$ patterns in the timeseries_) of the $residuals$ from a regression analysis. \n\nSince $d$ is approximately equal to $2 * (1 − r)$ and $d \\in [0, 4]$, where $r$ is the sample autocorrelation of the residuals.\n\nRule of thumb:\n* $d << $ indicate successive error terms are **positively correlated**\n* $d < 1$, there may be cause for alarm\n* $d > 2$, successive error terms are **negatively correlated**\n\n_**Positive serial correlation**_ is serial correlation in which a positive error for one observation increases the chances of a positive error for another observation.\n\n_**Negative serial correlation** implies that a positive error for one observation increases the chance of a negative error for another observation and a negative error for one observation increases the chances of a positive error for another._\n\n> [modified] source: Wikipedia page https://en.wikipedia.org/wiki/Durbin%E2%80%93Watson_statistic"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "C05ED0CB582743B2934E075D3D6C67E0"
    },
    "cell_type" : "code",
    "source" : "val dwStats = returnRates.mapValues(TimeSeriesStatisticalTests.dwtest)\n",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "presentation" : {
        "tabs_state" : "{\n  \"tab_id\": \"#tab462362486-0\"\n}",
        "pivot_chart_state" : "{\n  \"hiddenAttributes\": [],\n  \"menuLimit\": 200,\n  \"cols\": [],\n  \"rows\": [],\n  \"vals\": [],\n  \"exclusions\": {},\n  \"inclusions\": {},\n  \"unusedAttrsVertical\": 85,\n  \"autoSortUnusedAttrs\": false,\n  \"inclusionsInfo\": {},\n  \"aggregatorName\": \"Count\",\n  \"rendererName\": \"Table\"\n}"
      },
      "id" : "5391227F775448B2965557DC23E072CB"
    },
    "cell_type" : "code",
    "source" : "val dwStatsDF = dwStats.toDF(\"symbol\", \"d\").orderBy($\"d\".desc)\ndwStatsDF",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "E2D8F7CDB4494A5483E2C18900B5969C"
    },
    "cell_type" : "code",
    "source" : "dwStatsDF.describe()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "E1FD7415B3C242E781E627B057CBEB0E"
    },
    "cell_type" : "code",
    "source" : "dwStats.map(_.swap).min",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "4975F9A34BCD431F8F50F57C35BF0A83"
    },
    "cell_type" : "code",
    "source" : "dwStats.map(_.swap).max",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}